+use std::collections::HashMap;
use std::hash::Hasher;
use std::hash::sip::SipHasher;
use std::io::{File, IoError};
use std::io;
use std::os::args;
use std::str;
+use std::sync::{atomics, Arc};
use term::color::YELLOW;
use core::{Package, PackageSet, Target, Resolve};
dylib: (String, String)
}
-type Job = proc():Send -> CargoResult<()>;
+enum Job {
+ Work(proc():Send -> CargoResult<Vec<Job>>),
+}
+
+impl Job {
+ fn all(jobs: Vec<Job>, after: Vec<Job>) -> Job {
+ Work(proc() {
+ for Work(job) in jobs.move_iter() {
+ try!(job());
+ }
+ Ok(after)
+ })
+ }
+}
// This is a temporary assert that ensures the consistency of the arguments
// given the current limitations of Cargo. The long term fix is to have each
}
}).collect::<Vec<&Target>>();
- jobs.push((dep, try!(compile(targets.as_slice(), dep, &mut cx))));
+ try!(compile(targets.as_slice(), dep, &mut cx, &mut jobs));
}
cx.primary = true;
cx.dest = &target_dir;
- jobs.push((pkg, try!(compile(targets, pkg, &mut cx))));
+ try!(compile(targets, pkg, &mut cx, &mut jobs));
// Now that we've figured out everything that we're going to do, do it!
execute(cx.config, jobs)
}
-fn compile(targets: &[&Target], pkg: &Package,
- cx: &mut Context) -> CargoResult<(Freshness, Job)> {
+fn compile<'a>(targets: &[&Target], pkg: &'a Package, cx: &mut Context,
+ jobs: &mut Vec<(&'a Package, Freshness, Job)>) -> CargoResult<()> {
debug!("compile_pkg; pkg={}; targets={}", pkg, targets);
if targets.is_empty() {
- return Ok((Fresh, proc() Ok(())))
+ return Ok(())
}
// First check to see if this package is fresh.
// This is not quite accurate, we should only trigger forceful
// recompilations for downstream dependencies of ourselves, not everyone
// compiled afterwards.a
- //
- // TODO: Figure out how this works with targets
let fingerprint_loc = cx.dest.join(format!(".{}.fingerprint",
pkg.get_name()));
let (is_fresh, fingerprint) = try!(is_fresh(pkg, &fingerprint_loc, cx,
targets));
- let mut cmds = Vec::new();
-
+ // First part of the build step of a target is to execute all of the custom
+ // build commands.
+ //
// TODO: Should this be on the target or the package?
+ let mut build_cmds = Vec::new();
for build_cmd in pkg.get_manifest().get_build().iter() {
- cmds.push(compile_custom(pkg, build_cmd.as_slice(), cx));
+ build_cmds.push(compile_custom(pkg, build_cmd.as_slice(), cx));
}
// After the custom command has run, execute rustc for all targets of our
// package.
+ //
+ // Note that bins can all be built in parallel because they all depend on
+ // one another, but libs must be built sequentially because they may have
+ // interdependencies.
+ let mut libs = Vec::new();
+ let mut bins = Vec::new();
for &target in targets.iter() {
- cmds.push(rustc(pkg, target, cx));
+ let job = rustc(pkg, target, cx);
+ if target.is_lib() {
+ libs.push(job);
+ } else {
+ bins.push(job);
+ }
}
- cmds.push(proc() {
- // If this job runs, then everything has successfully compiled, so write
- // our new fingerprint to the relevant location to prevent
- // recompilations in the future.
- try!(File::create(&fingerprint_loc).write_str(fingerprint.as_slice()));
- Ok(())
- });
+ // Only after all the binaries have been built can we actually write the
+ // fingerprint. Currently fingerprints are transactionally done per package,
+ // not per-target.
+ //
+ // TODO: Can a fingerprint be per-target instead of per-package? Doing so
+ // would likely involve altering the granularity of key for the
+ // dependency queue that is later used to run jobs.
+ let state = Arc::new(atomics::AtomicUint::new(bins.len()));
+ let write_fingerprint = || {
+ let (my_state, fingerprint_loc, fingerprint) =
+ (state.clone(), fingerprint_loc.clone(), fingerprint.clone());
+ Work(proc() {
+ if my_state.load(atomics::SeqCst) == 0 {
+ let mut file = try!(File::create(&fingerprint_loc));
+ try!(file.write_str(fingerprint.as_slice()));
+ }
+ Ok(Vec::new())
+ })
+ };
- // TODO: this job itself may internally be parallel, but we're hiding that
- // currently. How to expose the parallelism among a single target?
- Ok((if is_fresh {Fresh} else {Dirty}, proc() {
- for cmd in cmds.move_iter() {
- try!(cmd());
- }
- Ok(())
- }))
+ // Note that we build the job backwards because each job will produce more
+ // work.
+ let build_libs = if bins.len() == 0 {
+ Job::all(libs, vec![write_fingerprint()])
+ } else {
+ Job::all(libs, bins.move_iter().map(|Work(bin)| {
+ let my_state = state.clone();
+ let write = write_fingerprint();
+ Work(proc() {
+ try!(bin());
+ my_state.fetch_sub(1, atomics::SeqCst);
+ Ok(vec![write])
+ })
+ }).collect())
+ };
+ let job = Job::all(build_cmds, vec![build_libs]);
+
+ jobs.push((pkg, if is_fresh {Fresh} else {Dirty}, job));
+ Ok(())
}
fn is_fresh(dep: &Package, loc: &Path,
for arg in cmd {
p = p.arg(arg);
}
- proc() p.exec_with_output().map(|_| ()).map_err(|e| e.mark_human())
+ Work(proc() {
+ try!(p.exec_with_output().map(|_| ()).map_err(|e| e.mark_human()));
+ Ok(Vec::new())
+ })
}
fn rustc(package: &Package, target: &Target, cx: &mut Context) -> Job {
log!(5, "command={}", rustc);
- let _ = cx.config.shell().verbose(|shell| shell.status("Running", rustc.to_string()));
+ let _ = cx.config.shell().verbose(|shell| {
+ shell.status("Running", rustc.to_string())
+ });
- proc() {
+ Work(proc() {
if primary {
log!(5, "executing primary");
- rustc.exec().map_err(|err| human(err.to_string()))
+ try!(rustc.exec().map_err(|err| human(err.to_string())))
} else {
log!(5, "executing deps");
- rustc.exec_with_output().and(Ok(())).map_err(|err| {
+ try!(rustc.exec_with_output().and(Ok(())).map_err(|err| {
human(err.to_string())
- })
+ }))
}
- }
+ Ok(Vec::new())
+ })
}
fn prepare_rustc(package: &Package, target: &Target, crate_types: Vec<&str>,
/// necessary dependencies, in order. Freshness is propagated as far as possible
/// along each dependency chain.
fn execute(config: &mut Config,
- jobs: Vec<(&Package, (Freshness, Job))>) -> CargoResult<()> {
+ jobs: Vec<(&Package, Freshness, Job)>) -> CargoResult<()> {
let pool = TaskPool::new(config.jobs());
let (tx, rx) = channel();
let mut queue = DependencyQueue::new();
- for &(pkg, _) in jobs.iter() {
+ for &(pkg, _, _) in jobs.iter() {
queue.register(pkg);
}
- for (pkg, (fresh, job)) in jobs.move_iter() {
+ for (pkg, fresh, job) in jobs.move_iter() {
queue.enqueue(pkg, fresh, (pkg, job));
}
// Iteratively execute the dependency graph. Each turn of this loop will
// schedule as much work as possible and then wait for one job to finish,
// possibly scheduling more work afterwards.
- let mut active = 0i;
+ let mut active = HashMap::new();
while queue.len() > 0 {
loop {
match queue.dequeue() {
Some((name, Fresh, (pkg, _))) => {
+ assert!(active.insert(name.clone(), 1u));
try!(config.shell().status("Fresh", pkg));
- tx.send((name, Fresh, Ok(())));
+ tx.send((name, Fresh, Ok(Vec::new())));
}
- Some((name, Dirty, (pkg, job))) => {
+ Some((name, Dirty, (pkg, Work(job)))) => {
+ assert!(active.insert(name.clone(), 1));
try!(config.shell().status("Compiling", pkg));
let my_tx = tx.clone();
pool.execute(proc() my_tx.send((name, Dirty, job())));
// Now that all possible work has been scheduled, wait for a piece of
// work to finish. If any package fails to build then we stop scheduling
// work as quickly as possibly.
- active -= 1;
- match rx.recv() {
- (name, fresh, Ok(())) => queue.finish(&name, fresh),
- (_, _, Err(e)) => {
- if active > 0 && config.jobs() > 1 {
+ let (name, fresh, result) = rx.recv();
+ *active.get_mut(&name) -= 1;
+ match result {
+ Ok(v) => {
+ for Work(job) in v.move_iter() {
+ *active.get_mut(&name) += 1;
+ let my_tx = tx.clone();
+ let my_name = name.clone();
+ pool.execute(proc() {
+ my_tx.send((my_name, fresh, job()));
+ });
+ }
+ if *active.get(&name) == 0 {
+ active.remove(&name);
+ queue.finish(&name, fresh);
+ }
+ }
+ Err(e) => {
+ if *active.get(&name) == 0 {
+ active.remove(&name);
+ }
+ if active.len() > 0 && config.jobs() > 1 {
try!(config.shell().say("Build failed, waiting for other \
jobs to finish...", YELLOW));
for _ in rx.iter() {}